package raft

import (
	"fmt"
	"github.com/hashicorp/raft"
	raftboltdb "github.com/hashicorp/raft-boltdb"
	"github.com/pkg/errors"
	"net"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"
)

type PeerInfo struct {
	ID      string
	Address string
}

type CommandType uint8

const (
	CommandGet    CommandType = 0 //获取元素
	CommandPut    CommandType = 1 //增加kv, 存在则返回失败
	CommandDelete CommandType = 2 //删除k
	CommandInc    CommandType = 3 //通过k对值原子的加v
)

type KVCommand struct {
	Key   int64
	Value int64
}

type RaftServer struct {
	id            string
	fsm           *FSM
	dir           string
	raftBootStrap bool

	peers         []PeerInfo
	raft          *raft.Raft
	raftBind      string                //绑定的端口
	raftStore     *raftboltdb.BoltStore //raft日志的存储,和kv的数据库是俩数据库
	raftTransport *raft.NetworkTransport
}

func NewServer(id string, peers []PeerInfo, dir string, raftBootStrap bool) *RaftServer {
	raftBind := raftAddressForID(id, peers)
	return &RaftServer{
		id:            id,
		peers:         peers,
		raftBind:      raftBind,
		dir:           dir,
		raftBootStrap: raftBootStrap,
	}
}

func (this *RaftServer) logPath() string {
	dirLog := this.dir + "/log-" + this.id + "/"
	return dirLog
}

func (this *RaftServer) kvPath() string {
	dirKV := this.dir + "/kv-" + this.id + "/"
	return dirKV
}

func (this *RaftServer) Start(config *raft.Config) error {
	defer func() {
		if this.raft == nil && this.raftStore != nil {
			if err := this.raftStore.Close(); err != nil {
				logging.Errorf("failed to close log storage: %v", err)
			}
		}
	}()
	var err error
	this.fsm, err = newFSM(this.kvPath())
	if err != nil {
		logging.Errorf("newFSM, error:%s", err.Error())
		return err
	}

	addr, err := net.ResolveTCPAddr("tcp", this.raftBind)
	if err != nil {
		return err
	}

	trans, err := raft.NewTCPTransport(this.raftBind, addr, 10, 10*time.Second, os.Stderr)
	if err != nil {
		return err
	}

	this.raftTransport = trans

	var logStore raft.LogStore
	var stable raft.StableStore
	var snap raft.SnapshotStore
	if err = ensureDir(this.logPath()); err != nil {
		return errors.Wrap(err, "failed to create log store directory")
	}

	// Create the backend raft store for logs and stable storage.
	this.raftStore, err = raftboltdb.NewBoltStore(filepath.Join(this.logPath(), "log.db"))
	if err != nil {
		return err
	}
	stable = this.raftStore

	// Wrap the store in a LogCache to improve performance.
	logStore, err = raft.NewLogCache(512, this.raftStore)
	if err != nil {
		return err
	}

	// Create the snapshot store.
	snap, err = raft.NewFileSnapshotStore(this.logPath(), 2, os.Stderr)
	if err != nil {
		return err
	}

	if config == nil {
		// Set default configuration for raft
		config = &raft.Config{
			ProtocolVersion:    raft.ProtocolVersionMax,
			HeartbeatTimeout:   500 * time.Millisecond,
			ElectionTimeout:    1000 * time.Millisecond,
			CommitTimeout:      50 * time.Millisecond,
			MaxAppendEntries:   64,
			ShutdownOnRemove:   true,
			TrailingLogs:       10240,
			SnapshotInterval:   120 * time.Second,
			SnapshotThreshold:  8192,
			LeaderLeaseTimeout: 500 * time.Millisecond,
		}
	}

	config.Logger = newAdapterLogger()
	config.LocalID = raft.ServerID(this.id)

	var hasState bool
	hasState, err = raft.HasExistingState(logStore, stable, snap)
	if err != nil {
		return err
	}

	if !hasState {
		configuration := raft.Configuration{}

		for _, v := range this.peers {
			peer := raft.Server{
				ID:      raft.ServerID(v.ID),
				Address: raft.ServerAddress(v.Address),
			}
			configuration.Servers = append(configuration.Servers, peer)
		}
		if err = raft.BootstrapCluster(config,
			logStore, stable, snap, trans, configuration); err != nil {
			return err
		}
	}

	this.raft, err = raft.NewRaft(config, this.fsm, logStore, stable, snap, trans)
	if err != nil {
		return err
	}

	logging.Debug("Raft server is starting")
	return nil
}

// JoinCluster joins new node to Raft cluster. If node is already the member of cluster,
// it will skip to add node.
func (this *RaftServer) JoinCluster(nodeID, addr string) error {
	if !this.IsLeader() {
		return errors.New("only leader can join node to the cluster")
	}

	logging.Debugf("joining peer %s at %s to the cluster.", nodeID, addr)

	configFuture := this.raft.GetConfiguration()
	if err := configFuture.Error(); err != nil {
		logging.Errorf("failed to get raft configuration: %v", err)
		return err
	}

	for _, srv := range configFuture.Configuration().Servers {
		// If a node already exists with either the joining node'this ID or address,
		// that node may need to be removed from the config first.
		if srv.ID == raft.ServerID(nodeID) || srv.Address == raft.ServerAddress(addr) {
			if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(nodeID) {
				logging.Debugf("node %s at %s is already the member of cluster.", nodeID, addr)
				return nil
			}

			future := this.raft.RemoveServer(srv.ID, 0, 0)
			if err := future.Error(); err != nil {
				return errors.Wrapf(err, "error removing existing node %s at %s", nodeID, addr)
			}
		}
	}

	f := this.raft.AddVoter(raft.ServerID(nodeID), raft.ServerAddress(addr), 0, 0)
	if f.Error() != nil {
		logging.Errorf("AddVoter, NodeID:%s, Addr:%s fail, Error:%s", nodeID, addr, f.Error().Error())
	}
	logging.Debugf("node %s at %s joined successfully", nodeID, addr)

	return nil
}

func (this *RaftServer) IsLeader() bool {
	return this.raft.State() == raft.Leader
}

func (this *RaftServer) ApplyCommand(cmdType CommandType, key, value int64) (*FSMApplyResult, error) {
	if !this.IsLeader() {
		return nil, errors.New("this is not the leader node")
	}

	cmdLog, err := makeRaftLogCommand(cmdType, KVCommand{
		Key: key, Value: value,
	})
	if err != nil {
		return nil, err
	}

	future := this.raft.Apply(cmdLog, 1*time.Second)
	if err := future.Error(); err != nil {
		return nil, err
	}

	resp := future.Response()
	result := resp.(*FSMApplyResult)
	return result, result.Error
}

func (this *RaftServer) GetLeaderServiceAddress() string {
	address := this.LeaderAddress()
	index := strings.Index(address, ":")
	port, _ := strconv.Atoi(address[index+1:])
	serviceAddress := fmt.Sprintf("%s:%d", address[:index], port+1)
	return serviceAddress
}

func (this *RaftServer) LeaderAddress() string {
	return string(this.raft.Leader())
}
